Python(Boto3)でDynamoDB上のデータの「アップサート」と「条件付き書き込み」を一度にできるのか確認してみた
こんにちは、CX事業本部の若槻です。
案件でIoTデバイスのステート管理をDynamoDB上のテーブルで行うことになり、そのときに以下のような仕様が要求されました。(実際よりは簡潔化しています)
- デバイスは自身のステート変更時にステート情報をAWSに送信してくる。
- テーブル(
state_table
)では以下のAttributionを管理する。deviceId
(String):デバイス識別ID(Partition Key)state
(Boolean):デバイスのステートのブール値deviceTimeStamp
(String):ステート変更時にデバイス側で記録されるタイムスタンプ
- 新しいステートデータが送信されてくるたびに、そのデバイスの
deviceId
のキーのstate
とdeviceTimeStamp
の値をDynamoDB上で更新する。 - 新規デバイスのステートデータは初回のステート送信時に初めて作成されるようにする。
- システムの都合上、このステートデータへの書き込み処理はIoT RuleとLambdaの2経路から行われる。
このとき、Lambda側のステートデータの書き込み処理の実装は以下のように行う必要があります。
- キーがあれば更新、なければ新規作成をする(アップサート)
- キーの更新は、新規データの
deviceTimaStamp
属性の値が既存データより小さければ行わない(条件付き書き込み)
Lambdaの実装はPythonとBoto3を利用するため、Boto3でDynamoDBのデータの「アップサート」と「条件付き書き込み」を一度にできるのか確認してみました。
〜〜2021/09/26追記〜〜
本ブログでは「Boto3のみではできない」と結論付けてTry/Catchを利用していますが、下記によるとできるようです。
なるほどConditionExpressionの中でOR条件すればよかったのね。Boto3で完結する上記方法を使う方が望ましいと思うので、本ブログの記述は別解としてお読みください。
〜〜追記おわり〜〜
確認してみた
DynamoDB.Table
クラスのupdate_item()
メソッド使って、ステートデータの「アップサート」と「条件付き書き込み」を一度に行う以下のようなスクリプトを作成しました。
import boto3 from boto3.resources.base import ServiceResource from botocore.exceptions import ClientError dynamodb_resource = boto3.resource('dynamodb') # ステートデータ device_id = '<デバイスID>' state = '<ステート>' device_time_stamp = '<タイムスタンプ>' def upsate_state(device_id: str, state: bool, device_time_stamp: str, dynamodb_resource: ServiceResource) -> None: table = dynamodb_resource.Table('device_state') try: option = { 'Key': {'deviceId': device_id}, 'ConditionExpression': '#deviceTimeStamp < :device_time_stamp', 'UpdateExpression': 'set #deviceTimeStamp = :device_time_stamp, #state = :state', 'ExpressionAttributeNames': { '#deviceTimeStamp': 'deviceTimeStamp', '#state': 'state' }, 'ExpressionAttributeValues': { ':device_time_stamp': device_time_stamp, ':state': state } } table.update_item(**option) return except ClientError as e: if e.response['Error']['Code'] != 'ConditionalCheckFailedException': raise print("old or not_exist data!!!")
update_item()
は、キーがあれば更新し無ければ新規作成をする「アップサート」の処理を行ってくれます。update_item()
に渡すオプションでConditionExpression
を指定すれば「条件付き書き込み」を行うことができます。値に指定した条件式を満たせば更新し、満たさなければClientError
のConditionalCheckFailedException
を発生させます。- 上記スクリプトでは
ConditionExpression
で「新規データのdeviceTimaStamp
属性の値が既存データより大きい」ことを書き込み条件にしています。
上記スクリプトを用いて、3パターンのステートデータで動作を確認したところ、下記のような結果となりました。
No. | 既存キー | 既存タイムスタンプとの比較 | 書き込み結果 |
---|---|---|---|
1 | あり | 大きい | 成功(DynamoDB上のデータが更新された) |
2 | 小さい | 失敗(ConditionalCheckFailedException 発生) |
|
3 | なし | - | 失敗(ConditionalCheckFailedException 発生) |
No.1と2は想定どおりの結果となりましたが、今回の確認対象のパターンのNo.3では、update_item()
でConditionExpression
を指定した上で、既存キーのないデータの新規作成を行おうとすると、ConditionalCheckFailedException
により書き込みが失敗するという結果となりました。
よって以上の確認により、「アップサート」と「条件付き書き込み」をupdate_item()
による一度の書き込み処理で行うことはできないという結論となりました。
「アップサート」と「条件付き書き込み」のいずれも行う方法
「アップサート」と「条件付き書き込み」のいずれも行いたい場合は、update_item()
に加えほかのメソッドを併用する必要があります。例として以下のような方法が考えられます。
- 1.
get_item()
で事前にキーの存在チェックを行い、存在する場合のみupdate_item()
でConditionExpression
を指定する - 2.
update_item()
でConditionalCheckFailedException
により書き込みが失敗した場合は、既存キーがない場合に限りput_item()
でデータを新規作成する - など
2つ目の方法の場合は以下のようなスクリプトとなります。put_item()
のConditionExpression
でattribute_not_exists(deviceId)
と指定して、キーが存在しない場合にのみデータを新規作成するようにすれば、当初の要求を満たす処理とすることができました。
import boto3 from boto3.resources.base import ServiceResource from botocore.exceptions import ClientError dynamodb_resource = boto3.resource('dynamodb') # ステートデータ device_id = '<デバイスID>' state = '<ステート>' device_time_stamp = '<タイムスタンプ>' def upsate_state(device_id: str, state: bool, device_time_stamp: str, dynamodb_resource: ServiceResource) -> None: table = dynamodb_resource.Table('device_state') try: option = { 'Key': {'deviceId': device_id}, 'ConditionExpression': '#deviceTimeStamp < :device_time_stamp', 'UpdateExpression': 'set #deviceTimeStamp = :device_time_stamp, #state = :state', 'ExpressionAttributeNames': { '#deviceTimeStamp': 'deviceTimeStamp', '#state': 'state' }, 'ExpressionAttributeValues': { ':device_time_stamp': device_time_stamp, ':state': state } } table.update_item(**option) return except ClientError as e: if e.response['Error']['Code'] != 'ConditionalCheckFailedException': raise print('old or not_exist data.') try: item = { 'deviceId': device_id, 'state': state, 'deviceTimeStamp': device_time_stamp } table.put_item( Item=item, ConditionExpression='attribute_not_exists(deviceId)' ) except ClientError as e: if e.response['Error']['Code'] != 'ConditionalCheckFailedException': raise print('old data.')
おわりに
DynamoDBのデータ更新で「アップサート」と「条件付き書き込み」を一度にできるのか確認してみたところ、結果として複数のメソッドを組み合わせる必要があったという話でした。単一のメソッドで出来た方が冗長にならないので良いのですが、現在のところはこの方法が(おそらく)一番有用そうです。
以上